package cn.hetra.hj212.session;

import com.google.common.collect.Lists;
import cn.hetra.hj212.core.CPS;
import cn.hetra.hj212.core.HJ212Data;
import cn.hetra.hj212.service.ExecutionContext;
import io.netty.channel.Channel;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 *  消息处理
 *  <pre>
 * 6.7 数采仪与监控中心初始化通讯流程
 *  数采仪与监控中心首次链接时，监控中心应对数采仪进行设置，具体操作如下：
 * 1) 数采仪时间校准；
 * 2) 超时数据与重发次数设置；
 * 3) 实时数据上报时间间隔设置；
 * 4) 分钟数据上报时间间隔设置；
 * 5) 实时数据是否上报设置；
 * 6) 污染治理设备运行状态是否上报设置。
 *  </pre>
 */
public class SessionImpl {
    private static final Logger LOGGER = LoggerFactory.getLogger(SessionImpl.class);
    private final ThreadPoolExecutor threadPoolExecutor;

    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

    public class ExecutionContextContainer {
        private ConcurrentMap<String, ExecutionContextImpl> contextMap = new ConcurrentHashMap();

        void add(ExecutionContextImpl exeCtx) {
            ExecutionContextImpl old = this.contextMap.putIfAbsent(exeCtx.getQN(), exeCtx);
            if (old != null) {// 33
                throw new IllegalStateException("Duplicated invocation id:" + exeCtx.getQN());
            }
        }

        void remove(String qn) {
            this.contextMap.remove(qn);
        }

        public ExecutionContextImpl get(String qn) {
            return this.contextMap.get(qn);
        }
    }

    ExecutionContextContainer executionContextContainer;
    final Channel channel;

    public HJ212Data build1000() {
        //设置重发次数及重发时间指令

        HJ212Data build = new HJ212Data.Builder()
                .QN(new DateTime().toString("yyyyMMddHHmmssSSS"))
                .ST(channel.attr(HJ212Data.ST_ATTR).get())
                .CN("1000")
                .PW(channel.attr(HJ212Data.PW_ATTR).get())
                .MN(channel.attr(HJ212Data.MN_ATTR).get())
                .version(channel.attr(HJ212Data.VERSION_ATTR).get())
                .A(1)
                .D(0)
                .Cps(new CPS.Builder()
                        .addEntry("OverTime", "5")
                        .addEntry("ReCount", "3").build())
                .build();
        return build;
    }

    /**
     * 设置现场机时间
     * 设置现场机时间时，数据区中如果含有污染物编码则表示上位机设置对应污染物编码的在线监控（监测）仪器
     * 仪表的时间，数据区中如果不含有污染物编码，则表示上位机设置的是数采仪的时间
     */
    public HJ212Data build1012() {
        //设置重发次数及重发时间指令
        HJ212Data build = new HJ212Data.Builder()
                .QN(new DateTime().toString("yyyyMMddHHmmssSSS"))
                .ST(channel.attr(HJ212Data.ST_ATTR).get())
                .CN("1012")
                .PW(channel.attr(HJ212Data.PW_ATTR).get())
                .MN(channel.attr(HJ212Data.MN_ATTR).get())
                .version(channel.attr(HJ212Data.VERSION_ATTR).get())
                .A(1)
                .D(0)
                .Cps(new CPS.Builder()
                        .addEntry("SystemTime", new DateTime().toString("yyyyMMddHHmmss")).build())
                .build();
        return build;
    }

    public SessionImpl(Channel channel, ThreadPoolExecutor threadPoolExecutor, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
        this.channel = channel;
        this.executionContextContainer = new ExecutionContextContainer();
        this.threadPoolExecutor = threadPoolExecutor;
        this.scheduledThreadPoolExecutor = scheduledThreadPoolExecutor;
    }

    public ExecutionContextImpl addTask(HJ212Data build) {
        ExecutionContextImpl exeCtx = new ExecutionContextImpl(build);
        executionContextContainer.add(exeCtx);
        exeCtx.execute();
        return exeCtx;
    }

    public void startInit() {
        HJ212Data hj212Data = build1012();
        addTask(hj212Data);
    }

    public void handleComplete(HJ212Data msg) {
        //返回执行结果
        ExecutionContextImpl exeCtx = executionContextContainer.get(msg.getQN());
        if (exeCtx != null) {
                threadPoolExecutor.execute(() -> {
                    try {
                        exeCtx.complete(msg);
                    } catch (Throwable ex) {
                        LOGGER.error("Handler result exception:" + msg.getQN(),ex);
                    }
                });
        } else {
            //超时部分的消息
            LOGGER.warn("Connection " + msg.getQN() + " receive result");
        }
    }

    public class ExecutionContextImpl implements ExecutionContext {
        private final HJ212Data request;
        List<ScheduledFuture<?>> futures = Lists.newArrayList();
        private final Lock lock = new ReentrantLock();
        HJ212Data data;
        private AtomicBoolean notified;
        private final Condition doneCondition;

        public String getQN() {
            return request.getQN();
        }

        long timeout = 10000;

        private ExecutionContextImpl(HJ212Data request) {
            this.notified = new AtomicBoolean(false);
            this.request = request;
            this.doneCondition = this.lock.newCondition();
        }
        private void complete(HJ212Data result){
            if (this.notified.compareAndSet(false, true)) {
                this.lock.lock();// 628
                try {
                    this.data = result;
                    this.doneCondition.signal();// 641
                } finally {
                    this.lock.unlock();// 643
                }
                Iterator<ScheduledFuture<?>> iterator = futures.iterator();
                while (iterator.hasNext()) {
                    ScheduledFuture<?> next = iterator.next();
                    if (!next.isDone()) {
                        next.cancel(false);
                    }
                    iterator.remove();
                }
            }
        }

        private void schedule(Runnable command,
                              long delay,
                              TimeUnit unit) {
            futures.add(scheduledThreadPoolExecutor.schedule(command
                    , delay, unit
            ));
        }

        public void execute() {
            schedule(() -> {
                        //超时处理
                        executionContextContainer.remove(request.getQN());
                        complete(null);
                    }
                    , timeout, TimeUnit.MILLISECONDS
            );
            channel.writeAndFlush(request);
        }

        boolean isDone() {
            return null != data;
        }

        /**
         * 执行成功
         */
        @Override
        public boolean get() throws TimeoutException {
            if (!this.isDone()) {
                this.lock.lock();
                try {
                    this.doneCondition.await(timeout, TimeUnit.MILLISECONDS);// 717
                } catch (InterruptedException var13) {
                    Thread.currentThread().interrupt();
                    LOGGER.warn("Current thread is interrupted.");
                }
            }
            return Optional.ofNullable(data)
                    .map(t -> "1".equals(t.getCps().new Parser().parse().getInfos().get("QnRtn")))
                    .orElseThrow(() -> new TimeoutException(getQN()));
        }


    }
    public void close() {
        channel.close();
    }


    public ExecutionContext send(String CN, CPS cps) {
        HJ212Data data = new HJ212Data.Builder()
                .QN(new DateTime().toString("yyyyMMddHHmmssSSS"))
                .ST(channel.attr(HJ212Data.ST_ATTR).get())
                .CN(CN)
                .PW(channel.attr(HJ212Data.PW_ATTR).get())
                .MN(channel.attr(HJ212Data.MN_ATTR).get())
                .version(channel.attr(HJ212Data.VERSION_ATTR).get())
                .A(1)
                .D(0)
                .Cps(cps)
                .build();
        return addTask(data);
    }

}
